package com.tpvlog.dfs.datanode.server;

import com.tpvlog.dfs.datanode.client.NameNodeRpcClient;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * 业务线程
 *
 * @author Ressmix
 */
public class IOThread extends Thread {
    // 文件上传
    public static final Integer REQUEST_SEND_FILE = 1;
    // 文件下载
    public static final Integer REQUEST_READ_FILE = 2;

    // 全局请求队列
    private NetworkRequestQueue requestQueue = NetworkRequestQueue.getInstance();

    private final NameNodeRpcClient rpcClient;

    public IOThread(NameNodeRpcClient rpcClient) {
        super();
        this.rpcClient = rpcClient;
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 1.不断从全局请求队列中获取NetworkRequest
                NetworkRequest request = requestQueue.poll();
                if (request == null) {
                    Thread.sleep(100);
                    continue;
                }

                Integer requestType = request.getRequestType();
                // 如果是文件上传请求
                if (requestType.equals(REQUEST_SEND_FILE)) {
                    writeFileToLocalDisk(request);
                }
                // 如果是文件下载请求
                else if (requestType.equals(REQUEST_READ_FILE)) {
                    readFileFromLocalDisk(request);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 写本地磁盘
     *
     * @param request
     */
    private void writeFileToLocalDisk(NetworkRequest request) {
        // 构建针对本地文件的输出流
        FileOutputStream localFileOut = null;
        FileChannel localFileChannel = null;

        try {
            // 1.写磁盘
            localFileOut = new FileOutputStream(request.getFilename());
            localFileChannel = localFileOut.getChannel();
            localFileChannel.position(localFileChannel.size());
            System.out.println("对本地磁盘文件定位到position=" + localFileChannel.size());

            int written = localFileChannel.write(request.getFileContent());
            System.out.println("本次文件上传完毕，将" + written + " bytes的数据写入本地磁盘文件.......");

            // 2.增量上报
            rpcClient.deltaReportDataNodeInfo(request.getFilename(), request.getFilesize());
            System.out.println("增量上报收到的文件副本给NameNode节点......");

            // 3.封装响应
            NetworkResponse response = new NetworkResponse();
            response.setClient(request.getClient());
            response.setBuffer(ByteBuffer.wrap("SUCCESS".getBytes()));
            NetworkResponseQueues.getInstance().offer(request.getProcessorId(), response);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                localFileChannel.close();
                localFileOut.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 从本地磁盘读文件
     *
     * @param request
     */
    private void readFileFromLocalDisk(NetworkRequest request) {
        FileInputStream localFileIn = null;
        FileChannel localFileChannel = null;

        try {
            // 从磁盘读取文件
            File file = new File(request.getFilename());
            Long fileLength = file.length();
            localFileIn = new FileInputStream(request.getFilename());
            localFileChannel = localFileIn.getChannel();

            // 响应buffer：8字节响应头（存文件大小）+文件内容
            ByteBuffer buffer = ByteBuffer.allocate(8 + Integer.valueOf(String.valueOf(fileLength)));
            buffer.putLong(fileLength);
            int hasReadImageLength = localFileChannel.read(buffer);
            System.out.println("从本次磁盘文件中读取了" + hasReadImageLength + " bytes的数据");

            buffer.rewind();

            // 封装响应，扔到处理该请求的Processor的响应队列中
            NetworkResponse response = new NetworkResponse();
            response.setClient(request.getClient());
            response.setBuffer(buffer);
            NetworkResponseQueues.getInstance().offer(request.getProcessorId(), response);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (localFileChannel != null) {
                    localFileChannel.close();
                }
                if (localFileIn != null) {
                    localFileIn.close();
                }
            } catch (Exception ex2) {
                ex2.printStackTrace();
            }
        }
    }
}
